{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "15c4bd28",
   "metadata": {},
   "source": [
    "# How to stream arbitrary nested content\n",
    "\n",
    "The most common use case for streaming from inside a node is to stream LLM tokens, but you may have other long-running streaming functions you wish to render for the user. While individual nodes in LangGraph cannot return generators (since they are executed to completion for each [superstep](https://langchain-ai.github.io/langgraph/concepts/#core-design)), we can still stream arbitrary custom functions from within a node using a similar tact and calling `astream_events` on the graph.\n",
    "\n",
    "We do so using a [RunnableGenerator](https://api.python.langchain.com/en/latest/runnables/langchain_core.runnables.base.RunnableGenerator.html#langchain-core-runnables-base-runnablegenerator) (which your function will automatically behave as if wrapped as a [RunnableLambda](https://api.python.langchain.com/en/latest/runnables/langchain_core.runnables.base.RunnableLambda.html#langchain_core.runnables.base.RunnableLambda)).\n",
    "\n",
    "Below is a simple toy example."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "486a01a0",
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain_core.messages import AIMessage\n",
    "from langchain_core.runnables import RunnableGenerator\n",
    "from langchain_core.runnables import RunnableConfig\n",
    "\n",
    "from langgraph.graph import START, StateGraph, MessagesState, END\n",
    "\n",
    "# Define a new graph\n",
    "workflow = StateGraph(MessagesState)\n",
    "\n",
    "\n",
    "async def my_generator(state: MessagesState):\n",
    "    messages = [\n",
    "        \"Four\",\n",
    "        \"score\",\n",
    "        \"and\",\n",
    "        \"seven\",\n",
    "        \"years\",\n",
    "        \"ago\",\n",
    "        \"our\",\n",
    "        \"fathers\",\n",
    "        \"...\",\n",
    "    ]\n",
    "    for message in messages:\n",
    "        yield message\n",
    "\n",
    "\n",
    "async def my_node(state: MessagesState, config: RunnableConfig):\n",
    "    messages = []\n",
    "    # Tagging a node makes it easy to filter out which events to include in your stream\n",
    "    # It's completely optional, but useful if you have many functions with similar names\n",
    "    gen = RunnableGenerator(my_generator).with_config(tags=[\"should_stream\"])\n",
    "    async for message in gen.astream(state):\n",
    "        messages.append(message)\n",
    "    return {\"messages\": [AIMessage(content=\" \".join(messages))]}\n",
    "\n",
    "\n",
    "workflow.add_node(\"model\", my_node)\n",
    "workflow.add_edge(START, \"model\")\n",
    "workflow.add_edge(\"model\", END)\n",
    "app = workflow.compile()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "ce773a40",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'chunk': 'Four'}|{'chunk': 'score'}|{'chunk': 'and'}|{'chunk': 'seven'}|{'chunk': 'years'}|{'chunk': 'ago'}|{'chunk': 'our'}|{'chunk': 'fathers'}|{'chunk': '...'}|"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/Users/harrisonchase/.pyenv/versions/3.11.1/envs/permchain/lib/python3.11/site-packages/langchain_core/_api/beta_decorator.py:87: LangChainBetaWarning: This API is in beta and may change in the future.\n",
      "  warn_beta(\n"
     ]
    }
   ],
   "source": [
    "from langchain_core.messages import HumanMessage\n",
    "\n",
    "inputs = [HumanMessage(content=\"What are you thinking about?\")]\n",
    "async for event in app.astream_events({\"messages\": inputs}, version=\"v1\"):\n",
    "    kind = event[\"event\"]\n",
    "    tags = event.get(\"tags\", [])\n",
    "    if kind == \"on_chain_stream\" and \"should_stream\" in tags:\n",
    "        data = event[\"data\"]\n",
    "        if data:\n",
    "            # Empty content in the context of OpenAI or Anthropic usually means\n",
    "            # that the model is asking for a tool to be invoked.\n",
    "            # So we only print non-empty content\n",
    "            print(data, end=\"|\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2c7b7902-2d80-4bf9-91c1-737b749e58a3",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
